/*
 * Copyright 2016 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.util.concurrent;

import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.UnstableApi;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * {@link EventExecutorGroup} which will preserve {@link Runnable} execution
 * order but makes no guarantees about what {@link EventExecutor} (and therefore
 * {@link Thread}) will be used to execute the {@link Runnable}s.
 *
 * <p>
 * The {@link EventExecutorGroup#next()} for the wrapped
 * {@link EventExecutorGroup} must <strong>NOT</strong> return executors of type
 * {@link OrderedEventExecutor}.
 */
@UnstableApi
public final class NonStickyEventExecutorGroup implements EventExecutorGroup
{
    private final EventExecutorGroup group;

    private final int maxTaskExecutePerRun;

    /**
     * Creates a new instance. Be aware that the given
     * {@link EventExecutorGroup} <strong>MUST NOT</strong> contain any
     * {@link OrderedEventExecutor}s.
     */
    public NonStickyEventExecutorGroup(EventExecutorGroup group)
    {
        this(group, 1024);
    }

    /**
     * Creates a new instance. Be aware that the given
     * {@link EventExecutorGroup} <strong>MUST NOT</strong> contain any
     * {@link OrderedEventExecutor}s.
     */
    public NonStickyEventExecutorGroup(EventExecutorGroup group,
            int maxTaskExecutePerRun)
    {
        this.group = verify(group);
        this.maxTaskExecutePerRun = ObjectUtil
                .checkPositive(maxTaskExecutePerRun, "maxTaskExecutePerRun");
    }

    private static EventExecutorGroup verify(EventExecutorGroup group)
    {
        Iterator<EventExecutor> executors = ObjectUtil
                .checkNotNull(group, "group").iterator();
        while (executors.hasNext())
        {
            EventExecutor executor = executors.next();
            if (executor instanceof OrderedEventExecutor)
            {
                throw new IllegalArgumentException("EventExecutorGroup " + group
                        + " contains OrderedEventExecutors: " + executor);
            }
        }
        return group;
    }

    private NonStickyOrderedEventExecutor newExecutor(EventExecutor executor)
    {
        return new NonStickyOrderedEventExecutor(executor,
                maxTaskExecutePerRun);
    }

    @Override
    public boolean isShuttingDown()
    {
        return group.isShuttingDown();
    }

    @Override
    public Future<?> shutdownGracefully()
    {
        return group.shutdownGracefully();
    }

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout,
            TimeUnit unit)
    {
        return group.shutdownGracefully(quietPeriod, timeout, unit);
    }

    @Override
    public Future<?> terminationFuture()
    {
        return group.terminationFuture();
    }

    @SuppressWarnings("deprecation")
    @Override
    public void shutdown()
    {
        group.shutdown();
    }

    @SuppressWarnings("deprecation")
    @Override
    public List<Runnable> shutdownNow()
    {
        return group.shutdownNow();
    }

    @Override
    public EventExecutor next()
    {
        return newExecutor(group.next());
    }

    @Override
    public Iterator<EventExecutor> iterator()
    {
        final Iterator<EventExecutor> itr = group.iterator();
        return new Iterator<EventExecutor>()
        {
            @Override
            public boolean hasNext()
            {
                return itr.hasNext();
            }

            @Override
            public EventExecutor next()
            {
                return newExecutor(itr.next());
            }

            @Override
            public void remove()
            {
                itr.remove();
            }
        };
    }

    @Override
    public Future<?> submit(Runnable task)
    {
        return group.submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result)
    {
        return group.submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task)
    {
        return group.submit(task);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay,
            TimeUnit unit)
    {
        return group.schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
            TimeUnit unit)
    {
        return group.schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
            long initialDelay, long period, TimeUnit unit)
    {
        return group.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
            long initialDelay, long delay, TimeUnit unit)
    {
        return group.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

    @Override
    public boolean isShutdown()
    {
        return group.isShutdown();
    }

    @Override
    public boolean isTerminated()
    {
        return group.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException
    {
        return group.awaitTermination(timeout, unit);
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks) throws InterruptedException
    {
        return group.invokeAll(tasks);
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout,
            TimeUnit unit) throws InterruptedException
    {
        return group.invokeAll(tasks, timeout, unit);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException
    {
        return group.invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
            long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException
    {
        return group.invokeAny(tasks, timeout, unit);
    }

    @Override
    public void execute(Runnable command)
    {
        group.execute(command);
    }

    private static final class NonStickyOrderedEventExecutor extends
            AbstractEventExecutor implements Runnable, OrderedEventExecutor
    {
        private final EventExecutor executor;

        private final Queue<Runnable> tasks = PlatformDependent.newMpscQueue();

        private static final int NONE = 0;

        private static final int SUBMITTED = 1;

        private static final int RUNNING = 2;

        private final AtomicInteger state = new AtomicInteger();

        private final int maxTaskExecutePerRun;

        NonStickyOrderedEventExecutor(EventExecutor executor,
                int maxTaskExecutePerRun)
        {
            super(executor);
            this.executor = executor;
            this.maxTaskExecutePerRun = maxTaskExecutePerRun;
        }

        @Override
        public void run()
        {
            if (!state.compareAndSet(SUBMITTED, RUNNING))
            {
                return;
            }
            for (;;)
            {
                int i = 0;
                try
                {
                    for (; i < maxTaskExecutePerRun; i++)
                    {
                        Runnable task = tasks.poll();
                        if (task == null)
                        {
                            break;
                        }
                        safeExecute(task);
                    }
                }
                finally
                {
                    if (i == maxTaskExecutePerRun)
                    {
                        try
                        {
                            state.set(SUBMITTED);
                            executor.execute(this);
                            return; // done
                        }
                        catch (Throwable ignore)
                        {
                            // Reset the state back to running as we will keep
                            // on executing tasks.
                            state.set(RUNNING);
                            // if an error happened we should just ignore it and
                            // let the loop run again as there is not
                            // much else we can do. Most likely this was
                            // triggered by a full task queue. In this case
                            // we just will run more tasks and try again later.
                        }
                    }
                    else
                    {
                        state.set(NONE);
                        return; // done
                    }
                }
            }
        }

        @Override
        public boolean inEventLoop(Thread thread)
        {
            return false;
        }

        @Override
        public boolean inEventLoop()
        {
            return false;
        }

        @Override
        public boolean isShuttingDown()
        {
            return executor.isShutdown();
        }

        @Override
        public Future<?> shutdownGracefully(long quietPeriod, long timeout,
                TimeUnit unit)
        {
            return executor.shutdownGracefully(quietPeriod, timeout, unit);
        }

        @Override
        public Future<?> terminationFuture()
        {
            return executor.terminationFuture();
        }

        @Override
        public void shutdown()
        {
            executor.shutdown();
        }

        @Override
        public boolean isShutdown()
        {
            return executor.isShutdown();
        }

        @Override
        public boolean isTerminated()
        {
            return executor.isTerminated();
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException
        {
            return executor.awaitTermination(timeout, unit);
        }

        @Override
        public void execute(Runnable command)
        {
            if (!tasks.offer(command))
            {
                throw new RejectedExecutionException();
            }
            if (state.compareAndSet(NONE, SUBMITTED))
            {
                // Actually it could happen that the runnable was picked up in
                // between but we not care to much and just
                // execute ourself. At worst this will be a NOOP when run() is
                // called.
                try
                {
                    executor.execute(this);
                }
                catch (Throwable e)
                {
                    // Not reset the state as some other Runnable may be added
                    // to the queue already in the meantime.
                    tasks.remove(command);
                    PlatformDependent.throwException(e);
                }
            }
        }
    }
}
